package cn.doitedu.ml.comment

import java.util

import cn.doitedu.commons.util.SparkUtil
import com.hankcs.hanlp.HanLP
import com.hankcs.hanlp.seg.common.Term
import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.feature.{HashingTF, IDF}
import org.apache.spark.sql.{DataFrame, Dataset}


/**
  * @date: 2020/2/23
  * @site: www.doitedu.cn
  * @author: hunter.d 涛哥
  * @qq: 657270652
  * @description: 用户评论数据分类
  *               算法：
  *               自然语言文本特征提取：TF-IDF
  *               分类算法： naive bayes
  *               分词工具包：HanLp （ik分词器，庖丁分词器，结巴分词）
  */
object CommentClassify {

  def main(args: Array[String]): Unit = {

    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    val spark = SparkUtil.getSparkSession(this.getClass.getSimpleName)
    import spark.implicits._

    // 加载停用词词典
    val stopwordsDs = spark.read.textFile("userprofile/data/demo/stopwords/stopwords.txt")
    // 收集到driver端并广播出去
    val stopwords = stopwordsDs.collect().toSet
    val bc = spark.sparkContext.broadcast(stopwords)


    /**
      * 1. 加载样本数据
      */
    val cp = spark.read.textFile("G:\\testdata\\comment\\poor")
    val zp = spark.read.textFile("G:\\testdata\\comment\\general")
    val hp = spark.read.textFile("G:\\testdata\\comment\\good")

    // 将3类样本合并到一个集合中，并且要为数据打上对应的类别标签 ==》（label,评论）
    val sample: Dataset[(Double, String)] = cp.map(cmt => (0.0, cmt)) union zp.map(cmt => (1.0, cmt)) union hp.map(cmt => (2.0, cmt))

    /**
      * 2.对样本集提取特征，向量化 ： （label,评论） ==》（label，tfidf特征向量）
      */
    // 2.1 中文分词 ： (1.0，手机收到了，系统用着有些不习惯，另外手机有点厚，这是真事体验。) ==>(1.0,词数组)
    val sampleWordsDF: DataFrame = sample.map(tp => {
      val label = tp._1
      val cmt = tp._2

      // 对评论去燥（去掉标点符号）
      val str = cmt.replaceAll("[\\p{Punct}\\pP]", "")

      // 调用HanLP分词
      val terms: util.List[Term] = HanLP.segment(str)
      import scala.collection.JavaConversions._

      // 对分词结果去燥
      val filtered = terms.filter(term =>
          // 根据HanLP标注的词性过滤介词、助词、语气词、标点
          !term.nature.startsWith("p") &&
          !term.nature.startsWith("u") &&
          !term.nature.startsWith("y") &&
          !term.nature.startsWith("w")
      )
        .filter(term=>{
          // 过滤停用词
          val stpw = bc.value
          !stpw.contains(term.word)
        })

      // 将term数组，转成字符串词数组
      val wordsArray = filtered.map(term => term.word).toArray

      // 返回处理结果
      (label, wordsArray)
    }).toDF("label","words")
    sampleWordsDF.show(20,false)

    // 2.2 将词数组转换为TF特征向量
    val tf = new HashingTF()
        .setNumFeatures(1000000)
        .setInputCol("words")
        .setOutputCol("tfvec")
    val sampleTF_DF = tf.transform(sampleWordsDF)

    // 2.3 将TF向量，转成 TF-IDF向量
    val idf = new IDF()
        .setInputCol("tfvec")
        .setOutputCol("tfidfvec")
    val idf_model = idf.fit(sampleTF_DF)
    val tfidf_DF = idf_model.transform(sampleTF_DF)
    tfidf_DF.drop("tfvec").show(20,false)


    /**
      * 将加工好的样本特征数据，切分成两部分：训练集，测试集
      */
    val Array(train,test) = tfidf_DF.randomSplit(Array(0.9,0.1))

    /**
      * 3.将特征向量数据输入naive bayes算法，训练模型
      */
    val bayes = new NaiveBayes()
        .setLabelCol("label")
        .setFeaturesCol("tfidfvec")
        .setSmoothing(1.0)

    val bayes_mode = bayes.fit(train)

    // 4.保存模型
    bayes_mode.save("userprofile/data/cmt_classfiy_model")



    // 5.加载模型，对测试数据做预测，评估预测效果
    val predict = bayes_mode.transform(test)
    predict.drop("words","tfvec","tfidfvec","probability").show(50,false)


    // 6.评估模型预测的准确率
    val correctCnts = predict.selectExpr("count(1) as total","count(if(label = prediction,1,null)) as correct")
    correctCnts.show(10,false)


    spark.close()

  }
}
